package com.cctv.cndms.common.consumer;

import com.cctv.cndms.common.consumer.queue.ConsumerBlockingQueue;
import com.cctv.cndms.enums.ResultState;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * @category 消息队列消费者
 * @author heyingcheng
 * @email heyingcheng@ctvit.com.cn
 * @date 2018/4/12 15:32
 */
@Slf4j
public abstract class BaseConsumer<T extends Message> implements Runnable {

    protected ConsumerBlockingQueue messageQueue;

    private volatile boolean running = true;

    protected abstract ResultState parseMessage(T t);

    public void stop() {
        running = false;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted() && running) {
            while(!messageQueue.isEmpty()) {
                T t = null;
                try {
                    t = (T) messageQueue.take();
                    log.info("消息消费者处理消息, message={}", t);
                    ResultState resultState = parseMessage(t);
                } catch (InterruptedException e) {
                    log.error("消息消费者执行中断异常", e);
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    log.error("消息消费者消费异常", e);
                } finally {
                    if (t != null) {
                        messageQueue.remove(t);
                    }
                }
            }
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                log.error("RedisQueue消费中断异常", e);
            }
        }
    }

}
